Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement option to use dask to do image co-addition #388

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

astrofrog
Copy link
Member

@astrofrog astrofrog commented Sep 11, 2023

This is an experiment for now but it allows:

  • N-dimensional reprojection/co-addition
  • combine_function='median' (not implemented here but trivial to do)
  • reproject_and_coadd to optionally return a dask array (return_type='dask') which means actually delaying the computation of the co-addition. You can extract a cutout from a co-added mosaic and compute that without ever having to actually compute the full mosaic.

At the moment, setting block_size is what toggles between the old code and the new dask-backed co-addition.

Still a work in progress and currently looking into how to get the parallel mode to work correctly (and actually be fast).

@keflavich - would you be able to try this out with some of your data? To use this, you just need to set e.g. block_size=(100, 100, 100) (or whatever you think makes sense) when calling the reproject_and_coadd function. For now, I am still investigating performance with the parallel mode, so I don't recommend trying to set parallel=... yet (though you technically can). I'm just curious to know at this point if it even gets close to working for you.

@codecov
Copy link

codecov bot commented Sep 11, 2023

Codecov Report

Attention: Patch coverage is 65.00000% with 49 lines in your changes are missing coverage. Please review.

Project coverage is 89.36%. Comparing base (2dedbc5) to head (e63d249).
Report is 3 commits behind head on main.

Files Patch % Lines
reproject/mosaicking/coadd.py 64.74% 49 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #388      +/-   ##
==========================================
- Coverage   93.60%   89.36%   -4.25%     
==========================================
  Files          25       25              
  Lines         892      959      +67     
==========================================
+ Hits          835      857      +22     
- Misses         57      102      +45     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@@ -16,6 +16,7 @@

@delayed(pure=True)
def as_delayed_memmap_path(array, tmp_dir):
tmp_dir = tempfile.mkdtemp() # FIXME
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why I didn't catch this before but I ran into an issue where the temporary directory no longer exists by the time the array is computed out of dask when using return_type='dask'.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yikes - are you saying that is a problem with the current implementation, or this line is fixing it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a temporary fix for this PR but I need to figure out a proper solution. It is a problem in main.

@astrofrog
Copy link
Member Author

astrofrog commented Sep 11, 2023

Ok so the parallel option seems to work provided the block size is reasonable (not too big) but I'm only seeing factors of 2-3x speedup.

@keflavich
Copy link
Contributor

I get the following traceback when trying on my big cubes:

Writing
[                                        ] | 0% Completed | 548.30 ms
Traceback (most recent call last):
  File "<ipython-input-3-9471e55417d0>", line 1, in <module>
    make_downsampled_cube(f'{basepath}/mosaics/cubes/CS21_CubeMosaic.fits',
  File "/orange/adamginsburg/ACES/reduction_ACES/aces/imaging/make_mosaic.py", line 721, in make_downsampled_cube
    dscube.write(outcubename, overwrite=overwrite)
  File "/blue/adamginsburg/adamginsburg/repos/spectral-cube/spectral_cube/dask_spectral_cube.py", line 1382, in write
    super().write(*args, **kwargs)
  File "/blue/adamginsburg/adamginsburg/repos/spectral-cube/spectral_cube/io/core.py", line 131, in __call__
    registry.write(self._instance, *args, **kwargs)
  File "/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/registry/compat.py", line 52, in wrapper
    return getattr(registry, method_name)(*args, **kwargs)
  File "/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/registry/core.py", line 383, in write
    return writer(data, *args, **kwargs)
  File "/blue/adamginsburg/adamginsburg/repos/spectral-cube/spectral_cube/io/fits.py", line 280, in write_fits_cube
    hdulist.writeto(filename, overwrite=overwrite)
  File "/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/fits/hdu/hdulist.py", line 1021, in writeto
    hdu._writeto(hdulist._file)
  File "/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/fits/hdu/base.py", line 710, in _writeto
    self._writeto_internal(fileobj, inplace, copy)
  File "/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/fits/hdu/base.py", line 716, in _writeto_internal
    data_offset, data_size = self._writedata(fileobj)
  File "/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/fits/hdu/base.py", line 647, in _writedata
    size += self._writedata_internal(fileobj)
  File "/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/fits/hdu/image.py", line 649, in _writedata_internal
    return self._writeinternal_dask(fileobj)
  File "/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/fits/hdu/image.py", line 740, in _writeinternal_dask
    output.store(outarr, lock=True, compute=True)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/array/core.py", line 1770, in store
    r = store([self], [target], **kwargs)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/array/core.py", line 1238, in store
    compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/base.py", line 342, in compute_as_if_collection
    return schedule(dsk2, keys, **kwargs)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 557, in get_sync
    return get_async(
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 500, in get_async
    for key, res_info, failed in queue_get(queue).result():
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception

  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 542, in submit
    fut.set_result(fn(*args, **kwargs))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 238, in batch_execute_tasks
    return [execute_task(*a) for a in it]
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 238, in <listcomp>
    return [execute_task(*a) for a in it]
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 229, in execute_task
    result = pack_exception(e, dumps)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 224, in execute_task

  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/core.py", line 119, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/core.py", line 119, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/array/core.py", line 122, in getter
    c = a[b]
  File "/blue/adamginsburg/adamginsburg/repos/spectral-cube/spectral_cube/dask_spectral_cube.py", line 199, in __getitem__
    return self._mask._filled(data=self._data,
  File "/blue/adamginsburg/adamginsburg/repos/spectral-cube/spectral_cube/masks.py", line 240, in _filled
    return np.ma.masked_array(sliced_data, mask=ex).filled(fill)
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/numpy/ma/core.py", line 2826, in __new__
    _data = np.array(data, dtype=dtype, copy=copy,
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/array/core.py", line 1702, in __array__
    x = self.compute()
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/base.py", line 315, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/base.py", line 600, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 557, in get_sync
    return get_async(
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 500, in get_async
    for key, res_info, failed in queue_get(queue).result():
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 542, in submit
    fut.set_result(fn(*args, **kwargs))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 238, in batch_execute_tasks
    return [execute_task(*a) for a in it]
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 238, in <listcomp>
    return [execute_task(*a) for a in it]
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 229, in execute_task
    result = pack_exception(e, dumps)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 224, in execute_task
    result = _execute_task(task, data)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/core.py", line 119, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/array/core.py", line 1284, in finalize
    return concatenate3(results)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/array/core.py", line 5289, in concatenate3
    result = np.empty(shape=shape, dtype=dtype(deepfirst(arrays)))
MemoryError: Unable to allocate 634. GiB for an array with shape (300, 10200, 27800) and data type >f8

I might have copy-pasted the traceback a bit incorrectly, it's very deep

@astrofrog
Copy link
Member Author

@keflavich - just to check, is that how big you would expect the output cube to be? Or is that incorrect? What is the command you are using?

If it is what you expect, have you tried using return_type='dask'? Alternatively, you could try and pass a memory mapped array as output_array?

@keflavich
Copy link
Contributor

that's the size of the input cube, not the output.

Call was this:

dscube = cube.reproject(hdr, return_type='dask', filled=False, parallel=True, block_size=[1,1000,1000])

Yes, passing the memmaped array as output sounds like a good idea - can try that next

@astrofrog
Copy link
Member Author

Which branch of spectral-cube are you using?

@keflavich
Copy link
Contributor

I'm on cubewcs_mosaic_and_dask_reproject, https://github.com/keflavich/spectral-cube/tree/cubewcs_mosaic_and_dask_reproject. It's not really a stable branch... I'm doing all sorts of experiments in parallel and getting a bit lost along the way.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants